package com.app.store;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;

import com.app.cache.CacheClient;
import com.app.config.DCacheConfig;
import com.app.db.DBPool;
import com.app.db.DaoUtil;
import com.app.exception.DCacheException;
import com.app.model.ColumnInfo;
import com.app.model.Condition;
import com.app.model.IndexEntry;
import com.app.model.TableInfo;
import com.app.util.StringUtils;


public abstract class AbstractMySqlStore extends AbstractStore {

	AbstractMySqlStore(int type) {
		super(type);
	}

	protected final DBPool pool = DBPool.getInstance();

	protected ConcurrentHashMap<String,TableInfo> tables = new ConcurrentHashMap<String, TableInfo>();
	
	protected boolean init;
	
	public abstract void initTables(List<TableInfo> tableInfos)throws DCacheException;
	
	public abstract Map<String,TableInfo> cacheTables() throws DCacheException;
	
	protected Map<String,ColumnInfo> getColumnInfo(String tableName,Connection conn) throws SQLException {
		PreparedStatement ps= null;
		ResultSet rs = null;
		Map<String,ColumnInfo> dbcols = new LinkedHashMap<String,ColumnInfo>();
		try {
			ps=conn.prepareStatement("DESC `"+tableName+"`");
			rs = ps.executeQuery();
			while(rs.next()) {
				ColumnInfo ci = new ColumnInfo();
				ci.setName(rs.getString(1));
				ci.setType(rs.getString(2));
				ci.setNullable(rs.getString(3).equalsIgnoreCase("YES"));
				String tmp = rs.getString(4); 
				ci.setKey(tmp.equalsIgnoreCase("PRI"));
				ci.setIndex(tmp.equalsIgnoreCase("MUL"));
				ci.setDef(rs.getString(5));
				ci.setExtra(rs.getString(6));
				dbcols.put(ci.getName(),ci);
			}
		} finally {
			DaoUtil.close(rs);
			DaoUtil.close(ps);
			rs = null;
			ps = null;
		}
		return dbcols;
	}
	
	public void putFromUpdateQueue(String tableName,LinkedHashMap<String,String> changed,Connection conn) throws DCacheException {
		
		putMySqlValue(tableName, changed,conn);
	}
	
	public void delFromDelQueue(String tableName,long id,Connection conn) throws DCacheException {
		deleteMySqlValue(tableName, id,conn);
	}
	
	/**
	 * 生成删除数据表中数据的sql
	 * @param tableName
	 * @return
	 */
	protected String genDeleteTableSql(String tableName) {
		StringBuilder sql=new StringBuilder();
		 sql.append("DELETE FROM ");
		 sql.append("`");
		 sql.append(tableName);
		 sql.append("`");
		 sql.append(" WHERE ");
		 sql.append("`");
		 sql.append(tables.get(tableName).getKey().getName());
		 sql.append("`");
		 sql.append("=?");
		 return sql.toString();
	}
	
	protected String genMaxIdSql(String tableName) {
		StringBuilder sql=new StringBuilder();
		 sql.append("SELECT MAX(");
		 sql.append("`");
		 sql.append(tables.get(tableName).getKey().getName());
		 sql.append("`) FROM `");
		 sql.append(tableName);
		 sql.append("`");
		 return sql.toString();
	}
	
	@Override
	protected void deleteDbValue(String tableName, long id) throws DCacheException {
		Connection conn=null;
		try {
			conn = pool.getConnection();
			deleteMySqlValue(tableName, id, conn);
		} catch (Exception e) {
			e.printStackTrace();
			logger.error(e.getMessage(), e);
			throw new DCacheException(e);
		}finally{
			DaoUtil.close(conn);
			conn = null;
		}
	}
	
	protected void deleteMySqlValue(String tableName, long id,Connection conn) throws DCacheException {
		String sql = genDeleteTableSql(tableName);
		PreparedStatement ps=null;
		try {
			//删除
			ps=conn.prepareStatement(sql);
			ps.setLong(1, id);
			int ret = ps.executeUpdate();
			if(ret == 0) {
				logger.error("删除数据影响行数为0 id:"+id+" tablename:"+tableName);
			}
		} catch (Exception e) {
			e.printStackTrace();
			logger.error(e.getMessage(), e);
			throw new DCacheException(e);
		}finally{
			DaoUtil.close(ps);
			ps = null;
		}
	}
	
	protected String getDbValue(String tableName,long id) throws DCacheException {
		Connection conn = null;
		PreparedStatement ps=null;
		ResultSet rs=null;
		String result = null;
		try {
			conn = pool.getConnection();
			String sql = getSelectSql(tableName);
			ps = conn.prepareStatement(sql);
			ps.setLong(1, id);
			rs = ps.executeQuery();
			List<String> lst = converResult2Json(tableName,rs);
			if(lst.size() > 0){
				result = lst.get(0);
			}
			getFromDBTimes.getAndAdd(1);
			if(logger.isDebugEnabled()) {
				AtomicLong ato = new AtomicLong();
				AtomicLong old = get.putIfAbsent(tableName, ato);
				if(old != null) {
					ato = old;
				}
				ato.addAndGet(1);
			}
			
		}catch (Exception e) {
			e.printStackTrace();
			logger.error(e.getMessage(),e);
			throw new DCacheException(e);
		}finally{
			DaoUtil.close(rs);
			DaoUtil.close(ps);
			DaoUtil.close(conn);
			conn = null;
			ps = null;
			rs = null;
		}
		return result;
	}
	
	 //删除数据库表中数据
	@Override
	 protected void deleteDbBatch(String tableName,List<Long> keyList) throws DCacheException {
		 Connection conn = null;
		 PreparedStatement ps=null;
		 String sql = genDeleteTableSql(tableName);
		 try{
			 int tmp = 0;
			 conn = pool.getConnection();
			 ps=conn.prepareStatement(sql.toString());
			 for(long key:keyList) {
				 ps.setLong(1, key);
				 ps.addBatch();
				 tmp++;
				if(tmp % BATCH_EXECUTE_SIZE == 0) {
					ps.executeBatch();
				}
			 }
			 ps.executeBatch();
			 ps.close();
		 } catch (Exception e) {
			 e.printStackTrace();
			 throw new DCacheException(e);
		 }
		 finally{
			 DaoUtil.close(ps);
			 ps = null;
			 DaoUtil.close(conn);
			 conn = null;
		 }
	 }
	 
	protected String getSelectSql(String tableName) {
		StringBuilder sql=new StringBuilder();
		sql.append( "SELECT ");
		for(ColumnInfo ci : tables.get(tableName).getColInfos()) {
			if(!ci.getName().equals(ColumnInfo.LAST_TIME.getName())) {
				sql.append("`");
				sql.append(ci.getName());
				sql.append("`");
				sql.append(",");
			}
		}
		String prefix = new String(sql.substring(0, sql.length() - 1));
		sql = new StringBuilder();
		sql.append(prefix);
		sql.append(" FROM " );
		sql.append("`");
		sql.append(tableName );
		sql.append("`");
		sql.append(" WHERE ");
		sql.append("`");
		sql.append(tables.get(tableName).getKey().getName());
		sql.append("`");
		sql.append(" = ?");
		return sql.toString();
	}
	
	protected abstract List<String> converResult2Json(String tableName,ResultSet rs) throws DCacheException;
	
	//数据库表中添加新数据
	@Override
	protected void putDbValue(String tableName,LinkedHashMap<String,String> changed) throws DCacheException {
		 Connection conn = null;
		 try {
			conn = pool.getConnection();
			putMySqlValue(tableName, changed, conn);
		} catch (SQLException e) {
			e.printStackTrace();
			logger.error(e.getMessage(), e);
			throw new DCacheException(e);
		}finally{
			DaoUtil.close(conn);
			conn = null;
		}
		 
	 }
	
	private void putMySqlValue(String tableName,LinkedHashMap<String,String> changed,Connection conn) {
		 PreparedStatement ps=null;
		 String sql = genInsertTableSql(tableName,changed);
		 try {
			ps=conn.prepareStatement(sql);
			int i=1;
			for(Entry<String, String> e : changed.entrySet()) {
				String v = e.getValue().equals(CacheClient.CACHE_RECORD_NULL_VALUE)? null : e.getValue();
				ps.setString(i, v);
				i++;
			}
			ps.executeUpdate();
		} catch (SQLException e) {
			e.printStackTrace();
			logger.error(e.getMessage(), e);
			throw new DCacheException(e);
		}finally{
			DaoUtil.close(ps);
			ps = null;
		}
	} 
	
	 /**
	  * 生成insert数据表sql
	  * @param tableName
	  * @return
	  */
	protected String genInsertTableSql(String tableName,LinkedHashMap<String,String> changed) {
		StringBuilder sql=new StringBuilder();
		 StringBuilder values = new StringBuilder();
		 StringBuilder update = new StringBuilder();
		 
		 sql.append("INSERT INTO ");
		 sql.append("`");
		 sql.append(tableName);
		 sql.append("`");
		 sql.append(" (");
		 
		 values.append(" VALUES(");
		 TableInfo info = tables.get(tableName);
		 for(String col : changed.keySet()) {
			 sql.append("`");
			 sql.append(col);
			 sql.append("`");
			 sql.append(",");
			 values.append("?,");
			 if(!col.equals(info.getKey().getName())) {
				 update.append(col);
				 update.append("=VALUES(");
				 update.append("`");
				 update.append(col);
				 update.append("`");
				 update.append("),");
			 }
		 }
		
		 String prefix = new String(sql.substring(0, sql.length() - 1));
		 String valueprefix = new String(values.substring(0,values.length() - 1));
		 String updateprefix = new String(update.substring(0,update.length() - 1));
		 values = new StringBuilder();
		 values.append(valueprefix);
		 values.append(")");
		 update.append(")");
		 sql = new StringBuilder();
		 sql.append(prefix);
		 sql.append(")");
		 sql.append(values);
		 sql.append(" ON DUPLICATE KEY UPDATE ");
		 sql.append(updateprefix);
		 return sql.toString();
	 }
	
	protected String genQueryValueName(Condition condition) {
		if(!StringUtils.isNullOrEmpty(condition.getFilterAttr())) {
			return condition.getFilterAttr();
		}else {
			return condition.getOrderAttr();
		}
	}
	
	protected String genQueryKeyName(String tableName) {
		return tables.get(tableName).getKey().getName();
	}
	
	@Override
	public List<String> queryEntityList(String tableName, Condition condition)
			throws DCacheException {
		List<IndexEntry> lst = queryIndexList(tableName, condition);
		List<String> resultList = new ArrayList<String>();
		if(lst.size()>0) {
			List<Long> keys = new ArrayList<Long>();
			for(int i=0;i<lst.size();i++) {
				keys.add(lst.get(i).getId());
			}
			resultList.addAll(this.getEntityList(tableName, keys).values());
		}
		return resultList;
	}
	
	@Override
	public long countEntities(String tableName)throws DCacheException {
		Connection conn=null;
		PreparedStatement ps=null;
		ResultSet rs=null;
		long numb=0;
		StringBuilder sql = new StringBuilder();
		try{
			
			conn=pool.getConnection();
			sql.append("SELECT COUNT(");
			sql.append("`");
			sql.append(tables.get(tableName).getKey().getName());
			sql.append("`");
			sql.append(") FROM ");
			sql.append("`");
			sql.append(tableName);
			sql.append("`");
		    ps=conn.prepareStatement(sql.toString());
		    rs=ps.executeQuery();
		    while(rs.next()) {
		    	numb=rs.getLong(1);
		    }
		}catch(Exception e) {
			e.printStackTrace();
			logger.error(e.getMessage() +" sql:"+sql, e);
			throw new DCacheException(e);
		}finally{
			DaoUtil.close(rs);
			DaoUtil.close(ps);
			DaoUtil.close(conn);
			rs = null;
			ps = null;
			conn = null;
		}
		return numb;
	}
	
	@Override
	public List<IndexEntry> queryIndexList(String tableName, Condition condition) throws DCacheException {
		if(StringUtils.isNullOrEmpty(condition.getFilterAttr()) && StringUtils.isNullOrEmpty(condition.getOrderAttr())) {
			throw new DCacheException("filterAttr and orderAttr is null or empty");
		}
		
		Connection conn=null;
		PreparedStatement ps=null;
		ResultSet rs=null;
		List<IndexEntry> ret=new ArrayList<IndexEntry>();
		StringBuilder sql = new StringBuilder();
		try{
			conn=pool.getConnection();
			String fieldName = genQueryValueName(condition);
			sql.append("SELECT ");
			sql.append("`");
			sql.append(genQueryKeyName(tableName));
			sql.append("`");
			sql.append(",");
			sql.append("`");
			sql.append(fieldName);
			sql.append("`");
			sql.append(" FROM ");
			sql.append("`");
			sql.append(tableName);
			sql.append("`");
			
			if (!StringUtils.isNullOrEmpty(condition.getFilterAttr())) {
                if (condition.getFilterType().equalsIgnoreCase("string")) {
                    sql.append(" where ");
                    sql.append("`");
                    sql.append(fieldName);
                    sql.append("`");
                    sql.append(condition.getFilterOperator());
                    sql.append("'" );
                    sql.append(condition.getFilterValue());
                    sql.append("'");
                }
                else {
                    sql.append(" where " );
                    sql.append("`");
                    sql.append(fieldName);
                    sql.append("`");
                    sql.append(condition.getFilterOperator() );
                    sql.append(condition.getFilterValue());
                }
            }
			if(condition.getOrderDir().trim().equals("+")) {
				sql.append(" ORDER BY ");
				sql.append("`");
				sql.append(fieldName);
				sql.append("`");
			}else if(condition.getOrderDir().trim().equals("-")) {
				sql.append(" ORDER BY ");
				sql.append("`");
				sql.append(fieldName);
				sql.append("`");
				sql.append(" DESC");
			}
			if(condition.getLimitRange()>100) {
				sql.append(" LIMIT ");
				sql.append(condition.getLimitOffset());
				sql.append(",");
				sql.append(100);
			}else {
				sql.append(" LIMIT ");
				sql.append(condition.getLimitOffset());
				sql.append(",");
				sql.append(condition.getLimitRange());
			}
			ps=conn.prepareStatement(sql.toString());
			rs=ps.executeQuery();
			while(rs.next()) {
				long key = rs.getLong(1);
				Object value = rs.getObject(2);
				IndexEntry ie = new IndexEntry();
				ie.setId(key);
				ie.setValue(value);
				ret.add(ie);
				
			}
		}catch(Exception e) {
			e.printStackTrace();
			logger.error(e.getMessage()+" sql:"+sql, e);
			throw new DCacheException(e);
		}finally{
			DaoUtil.close(rs);
			DaoUtil.close(ps);
			DaoUtil.close(conn);
			conn = null;
			rs = null;
			ps = null;
		}
		return ret;
	}
	
	@Override
	public long countWithCondition(String tableName, Condition condition)
			throws DCacheException {
		if(StringUtils.isNullOrEmpty(condition.getFilterAttr())) {
			throw new DCacheException("filterAttr is null or empty");
		}
		Connection conn=null;
		PreparedStatement ps=null;
		ResultSet rs=null;
		long numb=0;
		StringBuilder sql = new StringBuilder();
		try{
			String fieldName = genQueryValueName(condition);
			conn=pool.getConnection();
			sql.append("SELECT COUNT(");
			sql.append("`");
			sql.append(fieldName);
			sql.append("`");
			sql.append(") FROM ");
			sql.append("`");
			sql.append(tableName);
			sql.append("`");
			if(condition.getFilterAttr().trim().length()!=0) {
				if(condition.getFilterType().equalsIgnoreCase("string"))  {
					sql.append(" WHERE ");
					sql.append("`");
					sql.append(fieldName);
					sql.append("`");
					sql.append(condition.getFilterOperator());
					sql.append("'");
					sql.append(condition.getFilterValue());
					sql.append("'");
				}
				else {
					sql.append(" WHERE ");
					sql.append("`");
					sql.append(fieldName);
					sql.append("`");
					sql.append(condition.getFilterOperator());
					sql.append(condition.getFilterValue()); 
				}
			}
		    ps=conn.prepareStatement(sql.toString());
		    rs=ps.executeQuery();
		    while(rs.next()) {
		    	numb=rs.getLong(1);
		    }
		}catch(Exception e) {
			e.printStackTrace();
			logger.error("countWithCondition sql:"+sql);
			logger.error(e.getMessage(), e);
			throw new DCacheException(e);
		}finally{
			DaoUtil.close(rs);
			DaoUtil.close(ps);
			DaoUtil.close(conn);
			conn = null;
			rs = null;
			ps = null;
		}
		return numb;
	}
	
	@Override
	public long getMaxEntityId(String tableName) throws DCacheException {
		long num = 0;
		Connection conn=null;
		String sql = genMaxIdSql(tableName);
		PreparedStatement ps=null;
		ResultSet ret = null;
		
		try {
			conn = pool.getConnection();
			ps=conn.prepareStatement(sql);
			ret = ps.executeQuery();
			if(ret.next()) {
				num = ret.getLong(1);
			}
		} catch (Exception e) {
			e.printStackTrace();
			logger.error(e.getMessage(), e);
			throw new DCacheException(e);
		}finally{
			DaoUtil.close(ret);
			DaoUtil.close(ps);
			DaoUtil.close(conn);
			conn = null;
			ps = null;
			ret = null;
		}
		return Math.max(DCacheConfig.getInstance().getTableStartNum(), num);
	}
}
